Skip to content

feat: Adding Apache Arrow FlightShim for connector federation in C++ workers#26369

Open
BryanCutler wants to merge 2 commits intoprestodb:masterfrom
BryanCutler:bjc-fed-flight-server
Open

feat: Adding Apache Arrow FlightShim for connector federation in C++ workers#26369
BryanCutler wants to merge 2 commits intoprestodb:masterfrom
BryanCutler:bjc-fed-flight-server

Conversation

@BryanCutler
Copy link
Copy Markdown
Contributor

@BryanCutler BryanCutler commented Oct 20, 2025

Description

This adds a FlightShim module for connector federation. This includes an Apache Arrow Flight server that can load Presto Java connectors, and provide a record batch stream from the given connector split.

The FlightShim server is designed to work with a native arrow connector, that will use a Flight client to forward the connector split to the server and process the record batch stream. See related PR at #26404

RFC: prestodb/rfcs#46

Motivation and Context

This provides a path to connector federation with native workers

Impact

No API changes required.

Test Plan

Included unit tests for the FlightShim server functionality. The followup native connector #26404 includes e2e testing between the coordinator - native FlightShim connector - FlightShim server - multiple data sources. We have also tested internally in a full platform environment.

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Adding FlightShim server module for connector federation.

@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Oct 20, 2025
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Oct 20, 2025

Reviewer's Guide

Introduces a new presto-flight-shim module that runs an Apache Arrow Flight server to expose Presto JDBC-based connectors to C++ workers, including Arrow page-to-vector conversion, plugin/catalog bootstrapping, request wiring, configuration, and integration tests against multiple JDBC connectors.

Sequence diagram for FlightShim GetStream request handling

sequenceDiagram
    actor CppWorker
    participant FlightClient
    participant FlightServer as FlightServer_process
    participant Producer as FlightShimProducer
    participant PluginMgr as FlightShimPluginManager
    participant ConnectorHolder
    participant Connector
    participant BatchSource as ArrowBatchSource

    CppWorker->>FlightClient: DoGet(Ticket:FlightShimRequest_json)
    FlightClient->>FlightServer: DoGet(Ticket)
    FlightServer->>Producer: getStream(CallContext,Ticket,Listener)
    Producer->>Producer: submit runGetStreamAsync
    activate Producer
    Producer->>Producer: REQUEST_JSON_CODEC.fromJson(Ticket.bytes)

    Producer->>PluginMgr: getConnector(connectorId)
    activate PluginMgr
    PluginMgr->>PluginMgr: lookup CatalogPropertiesHolder
    PluginMgr->>ConnectorHolder: create_if_absent(ConnectorFactory,config,context)
    ConnectorHolder-->>PluginMgr: ConnectorHolder
    deactivate PluginMgr

    Producer-->>ConnectorHolder: getCodecSplit(), getCodecColumnHandle()
    ConnectorHolder-->>Producer: JsonCodec<ConnectorSplit>, JsonCodec<ColumnHandle>
    Producer->>Producer: deserialize ConnectorSplit, ColumnHandles

    Producer->>ConnectorHolder: getColumnMetadata(ColumnHandle)
    ConnectorHolder->>ConnectorHolder: reflect getColumnMetadata()/toColumnMetadata()
    ConnectorHolder-->>Producer: ColumnMetadata list

    Producer->>Connector: beginTransaction()
    Connector-->>Producer: ConnectorTransactionHandle

    Producer->>Connector: getPageSourceProvider()/getRecordSetProvider()
    Connector-->>Producer: ConnectorPageSourceProvider or ConnectorRecordSetProvider
    Producer->>Producer: create ConnectorSession(Session)
    Producer->>Connector: createPageSource(...)
    Connector-->>Producer: ConnectorPageSource

    Producer->>BatchSource: new ArrowBatchSource(allocator,columns,pageSource,maxRowsPerBatch)
    activate BatchSource
    Producer->>Listener: start(VectorSchemaRoot)

    loop for each batch
        Producer->>BatchSource: nextBatch()
        BatchSource->>ConnectorPageSource: getNextPage()
        ConnectorPageSource-->>BatchSource: Page
        BatchSource->>BatchSource: write values via ArrowShimWriter
        BatchSource-->>Producer: hasMore
        alt client_ready
            Producer->>Listener: putNext()
        else client_not_ready
            Producer->>Producer: backpressureStrategy.waitForListener()
        end
    end

    Producer->>Listener: completed()
    deactivate BatchSource
    deactivate Producer
    Listener-->>FlightClient: stream complete
    FlightClient-->>CppWorker: Arrow_record_batches
Loading

Class diagram for ArrowBatchSource and Arrow shim writers

classDiagram
    class ArrowBatchSource {
        -List~ColumnMetadata~ columns
        -VectorSchemaRoot root
        -List~ArrowShimWriter~ writers
        -int maxRowsPerBatch
        -ConnectorPageSource pageSource
        -Page currentPage
        -int currentPosition
        +ArrowBatchSource(BufferAllocator allocator, List~ColumnMetadata~ columns, ConnectorPageSource pageSource, int maxRowsPerBatch)
        +VectorSchemaRoot getVectorSchemaRoot()
        +boolean nextBatch()
        +void close()
        -static VectorSchemaRoot createVectorSchemaRoot(BufferAllocator allocator, List~ColumnMetadata~ columns)
        -static ArrowType prestoToArrowType(Type type)
        -static List~ArrowShimWriter~ createArrowWriters(VectorSchemaRoot root)
        -static ArrowShimWriter createArrowWriter(FieldVector vector)
        -static void allocateVectorCapacity(VectorSchemaRoot root, int capacity)
    }

    class ArrowShimWriter {
        <<abstract>>
        +void writeNull(int index)
        +void writeBoolean(int index, boolean value)
        +void writeLong(int index, long value)
        +void writeDouble(int index, double value)
        +void writeSlice(int index, Slice value, int offset, int length)
    }

    class ArrowFixedWidthShimWriter {
        <<abstract>>
        +BaseFixedWidthVector getVector()
        +void writeNull(int index)
    }

    class ArrowShimBitWriter {
        -BitVector vector
        +ArrowShimBitWriter(BitVector vector)
        +BitVector getVector()
        +void writeBoolean(int index, boolean value)
    }

    class ArrowShimTinyIntWriter {
        -TinyIntVector vector
        +ArrowShimTinyIntWriter(TinyIntVector vector)
        +TinyIntVector getVector()
        +void writeLong(int index, long value)
    }

    class ArrowShimSmallIntWriter {
        -SmallIntVector vector
        +ArrowShimSmallIntWriter(SmallIntVector vector)
        +SmallIntVector getVector()
        +void writeLong(int index, long value)
    }

    class ArrowShimIntWriter {
        -IntVector vector
        +ArrowShimIntWriter(IntVector vector)
        +IntVector getVector()
        +void writeLong(int index, long value)
    }

    class ArrowShimLongWriter {
        -BigIntVector vector
        +ArrowShimLongWriter(BigIntVector vector)
        +BigIntVector getVector()
        +void writeLong(int index, long value)
    }

    class ArrowShimRealWriter {
        -Float4Vector vector
        +ArrowShimRealWriter(Float4Vector vector)
        +Float4Vector getVector()
        +void writeLong(int index, long value)
    }

    class ArrowShimDoubleWriter {
        -Float8Vector vector
        +ArrowShimDoubleWriter(Float8Vector vector)
        +Float8Vector getVector()
        +void writeDouble(int index, double value)
    }

    class ArrowShimDecimalWriter {
        -DecimalVector vector
        +ArrowShimDecimalWriter(DecimalVector vector)
        +DecimalVector getVector()
        +void writeLong(int index, long value)
        +void writeSlice(int index, Slice value, int offset, int length)
    }

    class ArrowShimVariableWidthWriter {
        -BaseVariableWidthVector vector
        +ArrowShimVariableWidthWriter(BaseVariableWidthVector vector)
        +void writeNull(int index)
        +void writeSlice(int index, Slice value, int offset, int length)
    }

    class ArrowShimDateWriter {
        -DateDayVector vector
        +ArrowShimDateWriter(DateDayVector vector)
        +DateDayVector getVector()
        +void writeLong(int index, long value)
    }

    class ArrowShimTimeWriter {
        -TimeMilliVector vector
        +ArrowShimTimeWriter(TimeMilliVector vector)
        +TimeMilliVector getVector()
        +void writeLong(int index, long value)
    }

    class ArrowShimTimeStampWriter {
        -TimeStampVector vector
        +ArrowShimTimeStampWriter(TimeStampVector vector)
        +TimeStampVector getVector()
        +void writeLong(int index, long value)
    }

    ArrowBatchSource --> VectorSchemaRoot
    ArrowBatchSource --> ConnectorPageSource
    ArrowBatchSource --> ArrowShimWriter

    ArrowShimWriter <|-- ArrowFixedWidthShimWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimBitWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimTinyIntWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimSmallIntWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimIntWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimLongWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimRealWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimDoubleWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimDecimalWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimDateWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimTimeWriter
    ArrowFixedWidthShimWriter <|-- ArrowShimTimeStampWriter

    ArrowShimWriter <|-- ArrowShimVariableWidthWriter
Loading

Class diagram for FlightShim server, configuration, and plugin management

classDiagram
    class FlightShimConfig {
        +String CONFIG_PREFIX
        -String serverName
        -Integer serverPort
        -String serverSSLCertificateFile
        -String serverSSLKeyFile
        -String clientSSLCertificateFile
        -String clientSSLKeyFile
        -boolean serverSslEnabled
        -int maxRowsPerBatch
        -int readSplitThreadPoolSize
        +String getServerName()
        +FlightShimConfig setServerName(String serverName)
        +Integer getServerPort()
        +FlightShimConfig setServerPort(Integer serverPort)
        +boolean getServerSslEnabled()
        +FlightShimConfig setServerSslEnabled(boolean serverSslEnabled)
        +String getServerSSLCertificateFile()
        +FlightShimConfig setServerSSLCertificateFile(String serverSSLCertificateFile)
        +String getServerSSLKeyFile()
        +FlightShimConfig setServerSSLKeyFile(String serverSSLKeyFile)
        +String getClientSSLCertificateFile()
        +FlightShimConfig setClientSSLCertificateFile(String clientSSLCertificateFile)
        +String getClientSSLKeyFile()
        +FlightShimConfig setClientSSLKeyFile(String clientSSLKeyFile)
        +int getMaxRowsPerBatch()
        +FlightShimConfig setMaxRowsPerBatch(int maxRowsPerBatch)
        +FlightShimConfig setReadSplitThreadPoolSize(int readSplitThreadPoolSize)
        +int getReadSplitThreadPoolSize()
    }

    class FlightShimRequest {
        -String connectorId
        -byte[] splitBytes
        -List~byte[]~ columnHandlesBytes
        +FlightShimRequest(String connectorId, byte[] splitBytes, List~byte[]~ columnHandlesBytes)
        +String getConnectorId()
        +byte[] getSplitBytes()
        +List~byte[]~ getColumnHandlesBytes()
    }

    class FlightShimProducer {
        -BufferAllocator allocator
        -FlightShimPluginManager pluginManager
        -FlightShimConfig config
        -ExecutorService shimExecutor
        +FlightShimProducer(BufferAllocator allocator, FlightShimPluginManager pluginManager, FlightShimConfig config, ExecutorService shimExecutor)
        +void getStream(CallContext context, Ticket ticket, ServerStreamListener listener)
        -void runGetStreamAsync(CallContext context, Ticket ticket, ServerStreamListener listener)
        -ConnectorPageSourceProvider getConnectorPageSourceProvider(Connector connector, ConnectorId connectorId)
        +void close()
    }

    class FlightShimPluginManager {
        -File installedPluginsDir
        -List~String~ plugins
        -ArtifactResolver resolver
        -PluginInstaller pluginInstaller
        -File catalogConfigurationDir
        -Set~String~ disabledCatalogs
        -Map~String,ConnectorFactory~ connectorFactories
        -Map~String,ConnectorHolder~ connectors
        -Map~String,CatalogPropertiesHolder~ catalogPropertiesMap
        -FunctionAndTypeManager functionAndTypeManager
        -TypeDeserializer typeDeserializer
        -BlockEncodingManager blockEncodingManager
        -ProcedureRegistry procedureRegistry
        +FlightShimPluginManager(PluginManagerConfig pluginManagerConfig, StaticCatalogStoreConfig catalogStoreConfig, FunctionAndTypeManager functionAndTypeManager, TypeDeserializer typeDeserializer, BlockEncodingManager blockEncodingManager, ProcedureRegistry procedureRegistry)
        +void stop()
        +void loadPlugins()
        +void loadCatalogs()
        +ConnectorHolder getConnector(String connectorId)
    }

    class CatalogPropertiesHolder {
        -String connectorName
        -ImmutableMap~String,String~ catalogProperties
        +CatalogPropertiesHolder(String connectorName, ImmutableMap~String,String~ catalogProperties)
        +String getConnectorName()
        +ImmutableMap~String,String~ getCatalogProperties()
    }

    class ConnectorHolder {
        -Connector connector
        -JsonCodec~ConnectorSplit~ codecSplit
        -JsonCodec~ColumnHandle~ codecColumnHandle
        -Method getColumnMetadataMethod
        +ConnectorHolder(Connector connector, ConnectorHandleResolver resolver, TypeDeserializer typeDeserializer, BlockEncodingManager blockEncodingManager)
        +Connector getConnector()
        +JsonCodec~ConnectorSplit~ getCodecSplit()
        +JsonCodec~ColumnHandle~ getCodecColumnHandle()
        +ColumnMetadata getColumnMetadata(ColumnHandle handle)
    }

    class FlightShimModule {
        +FlightShimModule()
        +void setup(Binder binder)
        +ExecutorService createFlightShimServerExecutor(FlightShimConfig config)
    }

    class FlightShimServer {
        -FlightShimServer()
        +static Injector initialize(Map~String,String~ config, Module[] extraModules)
        +static FlightServer start(Injector injector, FlightServer_Builder builder)
        +static void main(String[] args)
    }

    class FlightShimServerExecutionMBean {
        -ThreadPoolExecutorMBean executorMBean
        +FlightShimServerExecutionMBean(ExecutorService executor)
        +ThreadPoolExecutorMBean getExecutor()
    }

    class ForFlightShimServer {
        <<annotation>>
    }

    FlightShimProducer --> FlightShimPluginManager
    FlightShimProducer --> FlightShimConfig
    FlightShimProducer --> ArrowBatchSource
    FlightShimProducer --> ConnectorHolder

    FlightShimPluginManager o--> CatalogPropertiesHolder
    FlightShimPluginManager o--> ConnectorHolder

    ConnectorHolder --> Connector

    FlightShimModule --> FlightShimConfig
    FlightShimModule --> FlightShimPluginManager
    FlightShimModule --> FlightShimProducer
    FlightShimModule --> FlightShimServerExecutionMBean
    FlightShimModule --> ForFlightShimServer

    FlightShimServer --> FlightShimModule
    FlightShimServer --> FlightShimConfig

    FlightShimServerExecutionMBean --> ForFlightShimServer

    ForFlightShimServer <.. FlightShimProducer
    ForFlightShimServer <.. FlightShimModule
    ForFlightShimServer <.. FlightShimServerExecutionMBean
Loading

File-Level Changes

Change Details Files
Add ArrowBatchSource to convert Presto ConnectorPageSource pages into Arrow VectorSchemaRoot record batches with per-type writers.
  • Create ArrowBatchSource that iterates ConnectorPageSource pages and fills Arrow vectors up to a configurable max-rows-per-batch.
  • Map Presto scalar types (boolean, integer, bigint, real, double, decimal, varbinary, char/varchar, date, time, timestamp, timestamp with time zone) to corresponding ArrowTypes.
  • Implement ArrowShimWriter hierarchy for fixed-width, variable-width, decimal, date, time, and timestamp vectors, including null handling and decimal unscaled-value decoding.
  • Allocate and clear Arrow vector capacity per batch and manage lifecycle of VectorSchemaRoot and underlying page source.
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/ArrowBatchSource.java
Introduce FlightShimPluginManager to dynamically load Presto connector plugins/catalogs and provide ConnectorHolder utilities for Flight shim.
  • Wrap PluginManagerUtil to install plugins from configured directories and plugin.bundles, tracking ConnectorFactories and loaded connectors.
  • Load catalog .properties or programmatic catalog definitions into an internal CatalogPropertiesHolder map, honoring disabled catalogs.
  • On-demand creation of connectors with ConnectorContextInstance, MetadataManager, row-expression services, stats calculators, and BlockEncodingManager.
  • Provide ConnectorHolder that exposes JSON codecs for ConnectorSplit and ColumnHandle and uses reflection to obtain ColumnMetadata from handles.
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimPluginManager.java
Add the presto-flight-shim Maven module with Arrow Flight server dependencies, test-only JDBC connectors, and build plugin configuration.
  • Register presto-flight-shim as a new module in the root pom.xml.
  • Define presto-flight-shim/pom.xml with Arrow memory/vector/flight-core, Airlift/Presto core dependencies, JDBC connector modules, Testcontainers, and duplicate-finder/enforcer configuration.
  • Configure Java 17 profile to open java.nio module to Arrow for tests and adjust test JVM arguments.
pom.xml
presto-flight-shim/pom.xml
Implement FlightShimProducer, FlightShimServer, and FlightShimModule to run a TLS-enabled Arrow Flight server that reads connector splits and streams Arrow batches to clients.
  • Implement FlightShimProducer as a NoOpFlightProducer subclass that deserializes FlightShimRequest tickets, constructs ConnectorPageSource from the target connector, and streams Arrow batches using ArrowBatchSource with backpressure support.
  • Add FlightShimConfig for server host/port, TLS/mTLS certificate paths, batch sizing, and read-thread pool sizing, with validation and configuration binding.
  • Create FlightShimModule wiring BufferAllocator, FlightShimPluginManager, metadata/type/block components, NoOpTransactionManager, and a dedicated ExecutorService annotated with @ForFlightShimServer, plus a JMX-exposed ThreadPoolExecutorMBean.
  • Add FlightShimServer bootstrap/entrypoint that initializes Guice, loads plugins/catalogs, configures FlightServer (including TLS and optional client cert verification), and starts the server using the configured executor and producer.
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimProducer.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimConfig.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimServer.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimModule.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimServerExecutionMBean.java
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/ForFlightShimServer.java
presto-flight-shim/etc/flightshim.properties
presto-flight-shim/etc/log.properties
Define FlightShimRequest protocol object for Flight tickets and add end-to-end tests validating shim behavior against PostgreSQL, MySQL, and SingleStore connectors.
  • Introduce FlightShimRequest DTO carrying connectorId, serialized split, and serialized ColumnHandles, with JSON serde via Airlift.
  • Add AbstractTestFlightShimBase to start an embedded FlightShim server with TLS, configure plugin bundles/catalogs, and build FlightShimRequest instances for tpch.lineitem splits and columns.
  • Add AbstractTestFlightShimQueries to execute SQL via a Flight client by mapping selected columns to JdbcColumnHandles, reading Arrow Flight streams, rebuilding Presto Pages, and comparing to ExpectedQueryRunner results.
  • Implement concrete tests for PostgreSQL, MySQL, and SingleStore connectors plus low-level producer tests including cancellation, tuple-domain/additional-predicate splits, and mTLS client auth, and config/request JSON round-trip tests.
  • Include test certificates (server, client, CA) and JSON split fixtures for tuple-domain and additional-predicate cases.
presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimRequest.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/AbstractTestFlightShimBase.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/AbstractTestFlightShimQueries.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimProducer.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimPostgres.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimMySql.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimSingleStore.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimConfig.java
presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimRequest.java
presto-flight-shim/src/test/resources/certs/ca.crt
presto-flight-shim/src/test/resources/certs/client.crt
presto-flight-shim/src/test/resources/certs/client.key
presto-flight-shim/src/test/resources/certs/server.crt
presto-flight-shim/src/test/resources/certs/server.key
presto-flight-shim/src/test/resources/split_additional_predicate.json
presto-flight-shim/src/test/resources/split_tuple_domain.json

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New security issues found

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 2 times, most recently from 8591906 to 99225b1 Compare November 5, 2025 02:30
sourcery-ai[bot]
sourcery-ai bot previously requested changes Nov 5, 2025
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New security issues found

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 2 times, most recently from 1f6d1f4 to f65e31d Compare November 5, 2025 19:25
@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 3 times, most recently from b877acd to 1757683 Compare November 20, 2025 22:10
@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 2 times, most recently from e72cc43 to a44937f Compare December 12, 2025 19:28
@BryanCutler BryanCutler changed the title WIP feat(flight-shim): Adding Apache Arrow FlightShim for federation in C++ workers feat(flight-shim): Adding Apache Arrow FlightShim for federation in C++ workers Dec 12, 2025
@BryanCutler BryanCutler changed the title feat(flight-shim): Adding Apache Arrow FlightShim for federation in C++ workers feat: Adding Apache Arrow FlightShim for connector federation in C++ workers Dec 12, 2025
@BryanCutler BryanCutler marked this pull request as ready for review December 12, 2025 19:30
@BryanCutler BryanCutler requested a review from a team as a code owner December 12, 2025 19:30
@prestodb-ci prestodb-ci requested review from a team, imsayari404 and jp-sivaprasad and removed request for a team December 12, 2025 19:30
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and found some issues that need to be addressed.

Blocking issues:

  • Identified a Private Key, which may compromise cryptographic security and sensitive data encryption. (link)
  • Identified a Private Key, which may compromise cryptographic security and sensitive data encryption. (link)

General comments:

  • ArrowBatchSource’s time-related writers (e.g., ArrowShimTimeWriter and ArrowShimTimeStampWriter) cast the Presto long value to int before writing to Arrow vectors; given Presto’s time/timestamp are typically higher-precision longs, consider confirming units and avoiding truncation/overflow by converting explicitly to the expected Arrow representation.
  • FlightShimPluginManager.getConnector stores connectors in the connectors map keyed by connector name rather than catalog/connectorId, which means multiple catalogs using the same connector type will share a single Connector instance and config; consider keying by catalogName/connectorId so per-catalog properties are respected.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- ArrowBatchSource’s time-related writers (e.g., ArrowShimTimeWriter and ArrowShimTimeStampWriter) cast the Presto `long` value to `int` before writing to Arrow vectors; given Presto’s time/timestamp are typically higher-precision longs, consider confirming units and avoiding truncation/overflow by converting explicitly to the expected Arrow representation.
- FlightShimPluginManager.getConnector stores connectors in the `connectors` map keyed by connector *name* rather than catalog/connectorId, which means multiple catalogs using the same connector type will share a single Connector instance and config; consider keying by catalogName/connectorId so per-catalog properties are respected.

## Individual Comments

### Comment 1
<location> `presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimPluginManager.java:275` </location>
<code_context>
+        final ImmutableMap<String, String> config = catalogPropertiesHolder.getCatalogProperties();
+
+        // Create connector instances from factories as needed
+        return connectors.computeIfAbsent(catalogPropertiesHolder.getConnectorName(), name -> {
+            log.debug("Loading connector: %s", connectorId);
+            ConnectorFactory factory = connectorFactories.get(name);
</code_context>

<issue_to_address>
**issue (bug_risk):** Connector cache keying by connector name can mix configurations from different catalogs

`connectors` is cached by `connectorName`, while `catalogPropertiesMap`/`getConnector` are keyed by `catalogName` (`connectorId`). If two catalogs share a connector name (e.g., multiple `mysql` catalogs with different configs), they will incorrectly share the same `ConnectorHolder` and configuration. Please change the cache to include the catalog name (e.g., key by `connectorId`) so each catalog gets its own connector instance and config.
</issue_to_address>

### Comment 2
<location> `presto-flight-shim/src/main/java/com/facebook/presto/flightshim/ArrowBatchSource.java:169-178` </location>
<code_context>
+            this.vector = vector;
+        }
+
+        @Override
+        public BitVector getVector()
+        {
+            return vector;
+        }
+
+        @Override
+        public void writeBoolean(int index, boolean value)
+        {
+            vector.set(index, value ? 1 : 0);
+        }
+    }
+
+    private static class ArrowShimTinyIntWriter
+            extends ArrowFixedWidthShimWriter
+    {
+        private final TinyIntVector vector;
+
+        public ArrowShimTinyIntWriter(TinyIntVector vector)
+        {
+            this.vector = vector;
+        }
+
+        @Override
+        public TinyIntVector getVector()
+        {
+            return vector;
+        }
+
+        @Override
+        public void writeLong(int index, long value)
+        {
+            vector.set(index, (int) value);
</code_context>

<issue_to_address>
**issue (bug_risk):** Timestamp values are truncated to 32-bit when writing to TIMESTAMPMILLI Arrow vector

In `ArrowShimTimeStampWriter.writeLong`, `long value` is cast to `int` before `vector.set(index, (int) value)`. For `TIMESTAMPMILLI`, Arrow requires a 64-bit millis-since-epoch value, so this truncates and corrupts most timestamps and may overflow silently. Please use the 64-bit setter on the appropriate `TimeStamp*Vector` and confirm `value` is in the expected units.
</issue_to_address>

### Comment 3
<location> `presto-flight-shim/src/main/java/com/facebook/presto/flightshim/FlightShimProducer.java:135-136` </location>
<code_context>
+                listener.start(batchSource.getVectorSchemaRoot());
+                while (batchSource.nextBatch()) {
+                    BackpressureStrategy.WaitResult waitResult;
+                    while ((waitResult = backpressureStrategy.waitForListener(CLIENT_POLL_TIME)) == BackpressureStrategy.WaitResult.TIMEOUT) {
+                        log.info(format("Waiting for client to read from connector %s", request.getConnectorId()));
+                    }
+                    if (waitResult != BackpressureStrategy.WaitResult.READY) {
</code_context>

<issue_to_address>
**suggestion:** Backpressure logging in wait loop may be noisy at INFO level for slow clients

This logs at INFO every 5 seconds while waiting, which can spam logs for slow or idle clients and obscure more important messages. Consider lowering this to DEBUG, emitting a single INFO per stream, or logging only after a longer wait threshold.

Suggested implementation:

```java
            try (ArrowBatchSource batchSource = new ArrowBatchSource(allocator, columnsMetadata, connectorPageSource, config.getMaxRowsPerBatch())) {
                listener.setUseZeroCopy(true);
                listener.start(batchSource.getVectorSchemaRoot());
                boolean loggedClientWaitInfo = false;
                while (batchSource.nextBatch()) {
                    BackpressureStrategy.WaitResult waitResult;
                    while ((waitResult = backpressureStrategy.waitForListener(CLIENT_POLL_TIME)) == BackpressureStrategy.WaitResult.TIMEOUT) {
                        if (!loggedClientWaitInfo) {
                            log.info(format("Client is slow to read from connector %s; entering backpressure wait", request.getConnectorId()));
                            loggedClientWaitInfo = true;
                        }
                        else {
                            log.debug(format("Waiting for client to read from connector %s", request.getConnectorId()));
                        }
                    }

```

```java
                    if (waitResult != BackpressureStrategy.WaitResult.READY) {
                        log.info(format("Read stopped from connector %s due to client wait result: %s", request.getConnectorId(), waitResult));
                        break;
                    }
                    listener.putNext();
                }
                listener.completed();
            }

```
</issue_to_address>

### Comment 4
<location> `presto-flight-shim/src/test/java/com/facebook/presto/flightshim/TestFlightShimProducer.java:82-91` </location>
<code_context>
+        return new FlightShimQueryRunner();
+    }
+
+    @Test
+    public void testSelectColumns()
+    {
</code_context>

<issue_to_address>
**issue (testing):** Tests only assert row count > 0 and do not validate data correctness against Presto queries.

In `testConnectorGetStream`, `testJdbcSplitWithTupleDomain`, `testJdbcSplitWithAdditionalPredicate`, and `testWithMtls`, the `assertGreaterThan(rowCount, 0)` checks (with `// TODO compare results against query`) only verify that data returns, not that it matches Presto’s results.

Since `AbstractTestFlightShimBase` / `AbstractTestFlightShimQueries` already expose an `ExpectedQueryRunner`, you could:
- Run the equivalent SQL via the expected query runner.
- Materialize both the Flight and query results into `MaterializedResult` and compare them (optionally order-insensitively).

This would provide stronger end-to-end validation of split/column handle deserialization and Arrow conversion correctness.
</issue_to_address>

### Comment 5
<location> `presto-flight-shim/src/test/resources/certs/server.key:1-28` </location>
<code_context>
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCtkJ+r1F8+YOVu
wWLxbGVsJKw3BEShtCsU+IXHJeaNJdBr59B/4h5WM37wOnnecmyEZTh47FXXkb5h
0xVlHES7eTAD+NPlWHufGJ9PR1kvQyZ0fyNRFXLzUID/dl7atHBtlrqE5Bhg7xqA
yPZjUjhkAZPgrT1/8+gYmbWPbw3Ba3+XRupq3Kn+EVVJi7wk4cj8jf6g1aex6sMO
kSYNsanb+JdEryevgoju+EtHgCHL6cB0eJs8PfMWiibgWLE2pkI0bdbGjTNVDDyg
ZoO8Qr/YrGvXXYqt0D7IKSUiO8bnrvZh6ITPEcQ3ePQRGEpqh8ggKaVq3RVkC3t3
QMRWGpsvAgMBAAECggEAIK9C+lNAallJ62z8inU8tjxDuAqUOBVbJZRVcPbIr1zn
HmLlpyd4Sghhh7CjYYoPuHDtTQxIcBNwlDBxb3x+zwUXzy+tC5v5j7DN01qex2Ew
XTDSAEN3Ra2r1S+/1hSztVd0oXDozFxKk+UETRjfKKoJZH6LPcy7MOLFR5EEuJ8L
0kvGdEtuNCmZ1vPBwqR3IKQS9NsB1IdTtK0g2LdtVzM3U6F173CrAx51qNeAL30j
Np+I0rfm7vYVco6nDQXJB86hzwwBnLMzmZR2E0z+JStQCjQtEJN9wp+NBnViMb8C
mZl0K/PH3ZKNEs1Aw/TsRpPu6Fc+sN6iIs2oOGiKfQKBgQDa0Wpfj+SHflLfmrRU
PplGNjWdJiyuXROqX18iNE8nAD0eRqAFdzj9yU1IW49KCzuHInEl2pP9yrDZTWXB
Bht4C+Vk13mrBE3Sc1LDrks5EhDLaaolLgx1B+JN1X2DpfuzO8WHrXR11PCzFTAp
yDSVd451CFFXMseS1V9UxCy3lQKBgQDLDrLX/0hGhG+a5RUaAE+hZk+tU9RyjYm6
/5lIoDjDwA9Yst69JCTHDApkdZ6IrjPDZrxkAQR6QwsGo+zRGkHV2wCoqR/RxcT5
RBcbe/8xL86ZKwnhAheP6ssgZeK5zOG1iLol319kXXuo6NueN+YlocmsppRvAOq7
/qMnhzXGswKBgQCpke2wHo9HnNJWK8ohGt2mtm232ZR4jvKlbgEIPac1Hw89/hcW
BT0qFqyILUQOakP4Re2PGyLiYwfHbh4zhisVTYq4Ke9EYzJ3qxzxPYlXsbNIHxtW
cqf+rVxnWtFIiwFR9TjvGrEMezcIYJwRVO/DAIJqGUcHnvdfx3B3/Qp2PQKBgQCk
y7UR37kEog8BotHRXFdEIgigHtzYa05QWYhJjN8E3yaVUfW7g03lzTvR9DNJsjeI
aiSS9NBxeV/Fb9yOh8TOjwKl3zxXvy3xLvWh9KxTev0tCeTmnBALWP6puIadTE4S
Snjoq7R7e/MUToeOjMdX20oVuMvWmuPm1u4K8o0OSQKBgQCec+QLllYXk22A8/e/
f5HhSYr161lEFFmuzKhuuy+esyCQU/KZmxQH0UqnsL3Ww4ofq42lteqyUJnriHsx
QP5FTIMKH8W+Xels1i6jCC+MVXAXraAF27dOlmKxWMN7mnElZ/7lQKmBq64wil35
sfcJA4FDxVM2Amv4KRo/w1C/zQ==
-----END PRIVATE KEY-----
</code_context>

<issue_to_address>
**security (private-key):** Identified a Private Key, which may compromise cryptographic security and sensitive data encryption.

*Source: gitleaks*
</issue_to_address>

### Comment 6
<location> `presto-flight-shim/src/test/resources/certs/client.key:1-28` </location>
<code_context>
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC8DK/RdBF6I+k+
DMGrjhMMBCnpPNwJtzJUuXYcHFYEdBnHY/rpjk/fi+7jD8bppynCZPakrDX+5VIM
zS4HBU/CHY26eR2ItiWqDoDkPAlCdgeKIGNYYEvVSuUW5YQX6fuD8PfCpCP5zK7D
JC2xTTsyEjBzD+MnIB7Tja9/22Djo2Ib2l9BEBOD+k79caPFtSqDQVdS5JLJ/P7B
eqGuFS8bEgtLwwCzRxPKkG64rXb9F0IErGwjXi/70BA24EW0uGAzzeY5Pnlx5Mul
EIjuxII/dTuoo+/uirN0wxURKSzTMyzzoJqGX9Ng9+Z+VqWFrnqckcmFcD4T0sPo
HpWZsYIHAgMBAAECggEAViw6JXFa0O3D5HtUBJmGgOsniYoqCwm4NrsGNLuHb2ME
rSpTwNNGJtqpDcQdEtVXfY1muO9xjuznPJaJkQ4ODpYcbGcz8YIGoHck+XHJjHsp
2VIeNFFsbsFzWZqzfYHrj/rMjpVJJx90tlfN2IHbroZHTXLqVPOTLL6wvZZ6P9XF
zqpWABKOaEDbenhSFFZeF5KR3NG9HSTm4YLuekumkH+QgrveDfDwXG4hAHqg836o
OF3NPaij6VlSR18nuyW0wMs/Ceu13P+GALqHmz98pFyVgHWQFryL9IccvJQDyEnt
saeG4IAVlJbZDGTnRgANLhpwBr7XhMG1aK+wmOMRgQKBgQDkcatiATlr9L8gfnHb
6pmX//AZLdXuQLXfuTvu638Brhm770noLgfIC+HIp5kCHxT2Xj5Vn+MSnYD6R6Wh
chApRKJUdsuz1iOq23YJjvsSLWCGpl9IxR7WY27uGOPIjQcOd1PRbkCq9AgUJwyn
ryca3sbYh/XQOWGLbJNIQs/S/QKBgQDSu6PVeMaS3276KblvGIvvaSAQDQWxXcC+
sA4CBmvjzx3xx5GAox/w7tcKmK/KQxNhaYy6N7xLc1YUJ9FbnT2PZQJhtP2d2Gat
Zre/+Qa+u84cR5hj9EI+B8FjW7D/psEj16KjHCds/SET6ngPM+RdB4N9daVFCurt
p0f717yiUwKBgBTJDun06I+dDkLbnmp/FwiQff0cgYmTE7lOdliPzteNSsQhypy4
i3a1Ng72yOI7h8G+43cQ/C02bYTYPgbJhRTsLMT4piIvysECBORrwQZvYIf/3U2W
ue6Rz4cUdq1Jv6meS98TZAjp+U40G1+qfSlhub/75u7SOcDg2SnLAnPVAoGBAIOO
EmRE5qpwA+b2P0Ykq89E8Hg0uPYWEiq427XV7mqkNQxoSuRkcZ9Ga0a5NRzurN2m
N+1UuB7eHMGubdtkmTa4lzkJ9T4iB09/DX0x6E0QD0bGR1M2/FefHdJ6PlAK+Q34
Ixbyj4ZRq+G0AUl0Wr7c3vBmjktA2pKMWLrW3nLzAoGBAKTl7qX6CD42gAJuT5Hp
rrXqlppVIyRvuXzXtX/Xq81IUHlBgS/t9HPyqDzmTKfxD8540kI+15bWPDHSJxiQ
ccqPaKyXhBXstDwGmlPKVzJUxk0dz5NHs+8gItUDOg78pM3siXN7vW9XBCH7mCDA
4zet/C0YCAiFVT+ipMoXy8Nc
-----END PRIVATE KEY-----
</code_context>

<issue_to_address>
**security (private-key):** Identified a Private Key, which may compromise cryptographic security and sensitive data encryption.

*Source: gitleaks*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +169 to +178
@Override
public void close()
throws IOException
{
root.close();
pageSource.close();
}

private static VectorSchemaRoot createVectorSchemaRoot(BufferAllocator allocator, List<ColumnMetadata> columns)
{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Timestamp values are truncated to 32-bit when writing to TIMESTAMPMILLI Arrow vector

In ArrowShimTimeStampWriter.writeLong, long value is cast to int before vector.set(index, (int) value). For TIMESTAMPMILLI, Arrow requires a 64-bit millis-since-epoch value, so this truncates and corrupts most timestamps and may overflow silently. Please use the 64-bit setter on the appropriate TimeStamp*Vector and confirm value is in the expected units.

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch from a44937f to 609020f Compare December 18, 2025 18:02
@BryanCutler
Copy link
Copy Markdown
Contributor Author

@sourcery-ai dismiss

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch from 609020f to a88149e Compare January 2, 2026 23:12
@BryanCutler BryanCutler dismissed sourcery-ai[bot]’s stale review January 2, 2026 23:30

private key for test purposes

@steveburnett
Copy link
Copy Markdown
Contributor

Consider if this PR needs a release note.

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 2 times, most recently from 32a7dd3 to 1910c22 Compare February 12, 2026 23:23
@BryanCutler BryanCutler requested a review from pdabre12 February 13, 2026 00:13
@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 2 times, most recently from e1e5e92 to e894c23 Compare February 13, 2026 03:21
Copy link
Copy Markdown
Contributor

@pdabre12 pdabre12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @BryanCutler.
Have some initial comments.

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 3 times, most recently from 47c70fa to beabbc3 Compare February 16, 2026 21:56
@BryanCutler BryanCutler requested a review from pdabre12 February 17, 2026 18:50
pdabre12
pdabre12 previously approved these changes Feb 18, 2026
@pdabre12
Copy link
Copy Markdown
Contributor

@tdcmeehan Can you please take a look?

@pdabre12 pdabre12 requested a review from tdcmeehan February 18, 2026 21:47
@tdcmeehan tdcmeehan self-assigned this Mar 4, 2026
@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 3 times, most recently from 683faf3 to 6de1f09 Compare March 13, 2026 22:54
@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch 4 times, most recently from 5cca77f to 71c3d4b Compare March 26, 2026 01:15
Arrow Flight server that can load Presto Java connectors, and provide a
record batch stream from the given connector split.

The FlightShim server is designed to work with a native arrow connector,
that will use a Flight client to forward the connector split to the
server and process the record batch stream. See related PR at
prestodb#26404

RFC: prestodb/rfcs#46

Co-authored-by: Pratik Joseph Dabre <pdabre12@gmail.com>
@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch from 71c3d4b to fd5e354 Compare March 26, 2026 17:28
checkArgument(javaType == double.class, "Unexpected type for Float8Vector: %s", type);
return new ArrowDoubleWriter((Float8Vector) vector, new BlockPrimitiveGetter(type));
case DECIMAL:
checkArgument(type instanceof DecimalType, "Expected DecimalType but got %", type);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
checkArgument(type instanceof DecimalType, "Expected DecimalType but got %", type);
checkArgument(type instanceof DecimalType, "Expected DecimalType but got %s", type);

}
}

public static class ConnectorHolder
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand why we need this. Wouldn't it be better to retrieve the column metadata via metadata.getColumnMetadata? Why do we need to use reflection to do that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't necessarily need ColumnMetadata but we need a way to get name, type and nullable from the ColumnHandle that's passed in the request to build the correct Arrow vectors, and there is no interface in ColumnHandle to do get this info.

I think your suggesting using MetadataManager.getColumnMetadata(Session session, TableHandle tableHandle, ColumnHandle columnHandle), but there are lot of requirements for this that aren't currently available. From a quick look it seems we would need:

  • TransactionManager - the NoOpTransactionManager is being inject right now because it's not used
  • CatalogManager - needs to keep track of the available catalogs to connector map, I'm not sure how these get registered
  • TableHandle - not currently being passed, so would need to somehow either create one from the connector or also pass this in the request

There might be more needed as well, unless I'm missing something. It seems like a lot to get column info from the handle, and would be nice if there was a dedicated interface for this. I'm not crazy about using reflection, but seemed like the simplest solution. I can look more into using the MetadataManager if you think that is a better way to do this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to model this like Presto on Spark. There are many dependencies, but we can model them as no-op dependencies that are injected via Guice.

Couldn't the table handle be passed in from the C++ connector?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can add a table handle as part of the request. It wasn't necessary for jdbc-based sources, so it wasn't included. It looks like we would need to add:

  • ConnectorTableHandleBytes
  • optional ConnectorTableLayoutHandleBytes
  • ConnectorTransactionHandleBytes

I'll start looking into doing it this way. cc @pdabre12

@Override
protected void setup(Binder binder)
{
binder.bind(ConnectorManager.class).toProvider(() -> null);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why we don't just load up a ConnectorManager directly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of requirements to inject a ConnectionManager, and seemed much more simple to load the connector and create a page source directly, as that's all that was needed.

@BryanCutler BryanCutler force-pushed the bjc-fed-flight-server branch from 7bc6693 to e846242 Compare March 30, 2026 19:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:IBM PR from IBM

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants